/**
 * FileName: StructuredStreaming
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/01/04 11:18
 * Description: 读取来自HDFS数据，简单处理后，并将数据存储到HDFS中
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.process.chain.ProcessChain;
import cn.com.bonc.process.impl.RuleActionProcessImpl;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;


public class StructuredStreamingHDFS {

	private static final String SAVE_PATH = "hdfs://192.168.70.21:9000/test_szj/parquet7";

	public static void main(String[] args) {

		SparkSession spark = SparkSession
				.builder()
				.appName("SparkHDFSApp")
				.getOrCreate();

		//读取来自HDFS数据源
		Dataset<Row> HDFSDataset = spark
				.readStream()
				.option("latestFirst","false")
				.option("fileNameOnly","false")
				.text(ConfigurationManager.getProperty(Constants.HDFS_SOURCE_PATH));

		Dataset<Row> filterResultDataset = ProcessChain
				.setSourceData(HDFSDataset)
				.addProcess(new RuleActionProcessImpl())
				.execute();


		//保存到HDFS
		StreamingQuery query = filterResultDataset
				.writeStream()
				.outputMode(OutputMode.Update())
				.format("text")
				.option("checkpointLocation", SAVE_PATH)
				.option("path", SAVE_PATH)
				.start();

		//保持程序等待
		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}


}
